package com.atguigu.gmall.realtime.test;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author Felix
 * @date 2023/12/16
 * 该案例演示了FlinkCDC
 */
public class Flink01_FlinkCDC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // enable checkpoint
        env.enableCheckpointing(3000);

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("hadoop102")
            .port(3306)
            .databaseList("gmall0710_config") // set captured database
            .tableList("gmall0710_config.t_user") // set captured table
            .username("root")
            .password("123456")
            // .startupOptions(StartupOptions.initial())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        //"op":"r": {"before":null,"after":{"id":2,"name":"xzls","age":30},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"gmall0710_config","sequence":null,"table":"t_user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1702694977131,"transaction":null}
        //"op":"c": {"before":null,"after":{"id":3,"name":"wls","age":20},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1702695064000,"snapshot":"false","db":"gmall0710_config","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":394,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1702695064269,"transaction":null}
        //"op":"u"： {"before":{"id":3,"name":"wls","age":20},"after":{"id":3,"name":"wls","age":40},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1702695097000,"snapshot":"false","db":"gmall0710_config","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":720,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1702695097178,"transaction":null}
        //"op":"d"： {"before":{"id":3,"name":"wls","age":40},"after":null,"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1702695125000,"snapshot":"false","db":"gmall0710_config","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":1052,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1702695125497,"transaction":null}
        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            .print();


        env.execute();
    }
}
